-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Incoming message interceptors #13641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the behaviour can be reduced to a single behaviour for both incoming and outgoing interceptors and the protocol behaviour can be replaced by a context map.
The formatting is all over the place. Please use a standard editor such as vim or emacs, most come with a reasonable indentation implementation. case
| fun
keywords should line up with their respective end
markers. Try to keep to the 80 char per line limit also.
@@ -0,0 +1,27 @@ | |||
-module(rabbit_protocol_accessor). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be replaced with a map that the intercept function takes.
e.g.
#{protocol => mqtt,
vhost => <<"/">>,
client_id =><<"test">>}
@@ -0,0 +1,38 @@ | |||
-module(rabbit_incoming_message_interceptor). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think we need to distinguish incoming and outgoing interceptors are the module / behaviour level.
We should have a single behaviour rabbit_message_interceptor
with a single intercept
function that takes the message container, the protocol context map (see another comment) and the config.
Instead they would be distinguined by which configuration they are added to. It is fine if this PR only implements incoming interceptors.
Sounds good to me. The context map could be set on initialization by each protocol accessor (AMQP 0.9.1 channel, AMQP 1.0 session, or MQTT connection process), and then passed to Any additional comments on not using the getters approach and going for both a incoming+outgoing approach, @ansd ? |
c164817
to
c52fb11
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're getting closer I think a few suggested changes and some formatting advise.
MsgDirection:: incoming | outgoing, | ||
Resp :: mc:state(). | ||
intercept(Msg, MsgInterceptorCtx, MsgDirection) -> | ||
InterceptorsList = list_to_atom(atom_to_list(MsgDirection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to use a tuple key for persistent term than doing this. e.g. {message_interceptors, incoming | outgoing}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs for ad hoc code to be put in rabbit:persist_static_configuration
persist_static_configuration(
[classic_queue_index_v2_segment_entry_count,
classic_queue_store_v2_max_cache_size,
classic_queue_store_v2_check_crc32,
incoming_message_interceptors,
outgoing_message_interceptors
]),
needs to become something like
persist_static_configuration(
[classic_queue_index_v2_segment_entry_count,
classic_queue_store_v2_max_cache_size,
classic_queue_store_v2_check_crc32,
{incoming_message_interceptors, {message_interceptors, incoming}
outgoing_message_interceptors, {message_interceptors, outgoing}
]),
and
persist_static_configuration(Params) ->
lists:foreach(
fun(Param) ->
case application:get_env(?MODULE, Param) of
{ok, Value} ->
ok = persistent_term:put(Param, Value);
undefined ->
ok
end
end, Params).
needs to become
persist_static_configuration(Params) ->
lists:foreach(
fun({Param, Alias}) ->
case application:get_env(?MODULE, Param) of
{ok, Value} ->
ok = persistent_term:put(Alias, Value);
undefined ->
ok
end;
(Param) ->
case application:get_env(?MODULE, Param) of
{ok, Value} ->
ok = persistent_term:put(Param, Value);
undefined ->
ok
end
end, Params).
Is that fine? What about this other option:
The cuttlefish translations could generate a rabbit.message_interceptors.<incoming|outgoing>
config instead of rabbit.<incoming_message_interceptors|outgoing_message_interceptors>
and the rabbit:persist_static_configuration
would only receive message_interceptors
instead of both <incoming|outgoing>_message_interceptors
. The interceptor module would read app conf. with message_interceptors
and then then get either the incoming or the outgoing ones from the read term.
The app conf would look like
[{rabbit, [{message_interceptors, #{incoming => [...], outgoing => [...] ...
instead of
[{rabbit, [...
{incoming_message_interceptors, ...},
{outgoing_message_interceptors,...},
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. that seems complicated. Perhaps just resolve incoming
to incoming_message_interceptors
etc in a function rather than doing atom to list conversions.
resolve(incoming) ->
incoming_message_interceptors;
resolve(outgoing) ->
outgoing_message_interceptors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, calling atom_to_list
for each message that RabbitMQ receives creates lots of garbage, I wouldn't be surprised if performance drops by a few percent.
What Karl suggested is great.
Alternatively, it might be simpler to omit these translations entirely. For example instead of
-spec intercept(Msg, MsgInterceptorCtx, MsgDirection) -> Resp when
Msg :: mc:state(),
MsgInterceptorCtx :: map(),
MsgDirection:: incoming | outgoing,
Resp :: mc:state().
we could have
-spec intercept(Msg, Context, Group) -> Msg when
Msg :: mc:state(),
Context :: map(),
Group :: incoming_message_interceptors | outgoing_message_interceptors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the group option actually. I think it should be it's own argument to intercept/4
though so we don't have to modify the context map for each call.
Interceptors = persistent_term:get(InterceptorsList, []), | ||
lists:foldl(fun({Module, Config}, Msg0) -> | ||
try | ||
Module:intercept(Msg0, MsgInterceptorCtx, Config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the direction should be passed to the interceptor, you may want to just have a single interceptor for both incoming and outgoing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed an interceptor would do the same independently of the direction of the message. It is the user the one that decides (if the interceptor allows it by providing the appropriate message_interceptors.<direction>.
cuttlefish mapping) in which direction should the interceptor be applied, or if it should be applied in both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As suggested by @gomoripeti , the rabbit_message_interceptor
module could also add the message direction in the MsgInterceptorCtx
map. How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good.
end, Msg, Interceptors). | ||
-callback intercept( | ||
Msg :: mc:state(), | ||
MsgInterceptorCtx :: map(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define some context properties that we'd always pass, e.g. vhost
, username
and the current protocol itself?
e.g.
#{protocol := amqp091 | amqp | mqttv3 | mqttv5, %% tags to be defined
vhost := binary(),
username := binary(),
conn_name => binary() % optional
atom() -> term()}
@kjnilsson Thanks on the formatting advices :) . I'm using neovim and I've tried to find out a proper solution on this, but I haven't found yet a solution that would allow me to format my contributions w/out re-formatting previous code. Will try to figure this out. |
It is an art rather than a science really. I use nvim too and it is fine for indentation - just hit '=' for your selection and it mostly does the right thing. |
Thank you @LoisSotoLopez for this contribution!
The only downside of the context map I see is that we duplicate some state information, for example each MQTT connection has higher memory usage after this PR. However, this context map is cleaner and simpler, so I prefer the context map. Feel free to go with outgoing interceptors as well. |
MQTT could add the context map to a persistent term to avoid this if necessary |
no scrap that, the context map is per vhost / user. |
|
||
-behaviour(rabbit_message_interceptor). | ||
|
||
-define(ANN_CLIENT_ID, <<"client_id">>). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be x-opt-client-id
or x-opt-mqtt-client-id
or x-opt-mqtt-publisher-client-id
such that this message annotation will be received by an AMQP 1.0 client in the message-annotation section.
The current annotation client_id
also gets lost when the MQTT client publishes into a stream. Maybe this key should be a configuration option by the plugin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x-opt-mqtt-client-id
gets my vote.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I suppose it's best to include mqtt
so that it's clear that this is an MQTT client ID.
"Client ID" is otherwise a broad term which could for example refer to a JMS client ID (https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#client-identifier) whose purpose is similar though:
The purpose of client identifier is to associate a connection and its objects with a state maintained on behalf of the client by a provider. By definition, the client state identified by a client identifier can be ‘in use’ by only one client at a time.
MQTT could consider building it on the fly to keep memory overhead low |
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of | ||
[] -> | ||
cuttlefish:unset(); | ||
L -> | ||
InterceptorsConfig = [ | ||
{Module0, Config, Value} | ||
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L | ||
], | ||
{Result, Order0} = lists:foldl( | ||
fun({Interceptor0, Key0, Value}, {Acc, Order}) -> | ||
Interceptor = list_to_atom(Interceptor0), | ||
Key = list_to_atom(Key0), | ||
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, | ||
% This Interceptor -> Module alias exists for | ||
% compatibility reasons | ||
Module = case Interceptor of | ||
set_header_timestamp -> | ||
rabbit_header_timestamp_interceptor; | ||
set_header_routing_node -> | ||
rabbit_header_routing_node_interceptor; | ||
_ -> | ||
Interceptor | ||
end, | ||
NewAcc = | ||
maps:update_with( | ||
Module, | ||
MapPutFun, | ||
#{Key => Value}, | ||
Acc), | ||
{NewAcc, [Module| Order]} | ||
end, | ||
{#{}, []}, | ||
InterceptorsConfig | ||
), | ||
Order = lists:uniq(Order0), | ||
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] | ||
end | ||
end | ||
}. | ||
|
||
{translation, "rabbit.outgoing_message_interceptors", | ||
fun(Conf) -> | ||
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of | ||
[] -> | ||
cuttlefish:unset(); | ||
L -> | ||
InterceptorsConfig = [ | ||
{Module0, Config, Value} | ||
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L | ||
], | ||
{Result, Order0} = lists:foldl( | ||
fun({Interceptor0, Key0, Value}, {Acc, Order}) -> | ||
Module = list_to_atom(Interceptor0), | ||
Key = list_to_atom(Key0), | ||
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, | ||
NewAcc = | ||
maps:update_with( | ||
Module, | ||
MapPutFun, | ||
#{Key => Value}, | ||
Acc), | ||
{NewAcc, [Module| Order]} | ||
end, | ||
{#{}, []}, | ||
InterceptorsConfig | ||
), | ||
Order = lists:uniq(Order0), | ||
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add unit tests?
deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
is the file to add these tests.
0616c1e
to
11a14e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nearly there! couple of tweaks
@@ -2607,3 +2609,15 @@ mc_env() -> | |||
MqttX -> | |||
#{mqtt_x => MqttX} | |||
end. | |||
|
|||
build_msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great and what we discussed as we think lower memory overhead is preferrable. It would be good if it at all possible you could run some throughput tests so we can get a view on how much this might affect performance. https://www.rabbitmq.com/blog/2023/03/21/native-mqtt#latency-and-throughput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run the benchmarks against the last commit (build on publish) and with the last one commit building the context map on init (build on init).
On an Arch Linux w/ 10 cores and 24 GB of RAM.
QOS1 build on publish
================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 63.151
Average Runtime (sec): 62.670
Pub time min (ms): 0.134
Pub time max (ms): 39.215
Pub time mean mean (ms): 6.258
Pub time mean std (ms): 0.044
Average Bandwidth (msg/sec): 159.574
Total Bandwidth (msg/sec): 15957.402
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.126
Forward latency max (ms): 36.001
Forward latency mean std (ms): 0.020
Total Mean forward latency (ms): 3.128
QOS0 build on publish
================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 1.352
Average Runtime (sec): 0.966
Pub time min (ms): 0.002
Pub time max (ms): 52.091
Pub time mean mean (ms): 0.036
Pub time mean std (ms): 0.014
Average Bandwidth (msg/sec): 14429.931
Total Bandwidth (msg/sec): 1442993.119
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 75.687% (756868/1000000)
Forward latency min (ms): 7.968
Forward latency max (ms): 4167.565
Forward latency mean std (ms): 345.354
Total Mean forward latency (ms): 1803.073
QOS1 build on init
================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 63.087
Average Runtime (sec): 62.568
Pub time min (ms): 0.142
Pub time max (ms): 31.975
Pub time mean mean (ms): 6.248
Pub time mean std (ms): 0.054
Average Bandwidth (msg/sec): 159.839
Total Bandwidth (msg/sec): 15983.907
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.138
Forward latency max (ms): 19.824
Forward latency mean std (ms): 0.025
Total Mean forward latency (ms): 3.112
QOS0 build on init
================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 1.423
Average Runtime (sec): 0.961
Pub time min (ms): 0.002
Pub time max (ms): 38.604
Pub time mean mean (ms): 0.030
Pub time mean std (ms): 0.008
Average Bandwidth (msg/sec): 12087.794
Total Bandwidth (msg/sec): 1208779.411
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 75.167% (751674/1000000)
Forward latency min (ms): 1.671
Forward latency max (ms): 4043.116
Forward latency mean std (ms): 207.787
Total Mean forward latency (ms): 1765.429
I get that the under-100% Total Forward Success Ratio on subscribers for QOS 0 comes from me using not a boxed environment + reduced RAM compared to the one used on the doc page you linked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So unexpectedly the qos0 test is faster creating the map dynamically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it is, but does not really make too much sense to me. I'll repeat this benchmark on a dedicated machine just in case after addressing the squashed commit writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, a couple of lines to indent with =
.
Then just squash all commits and write a nice single commit for the feature and update PR description and we can merge I think.
-module(rabbit_mqtt_message_interceptor_client_id). | ||
|
||
-behaviour(rabbit_message_interceptor). | ||
|
||
-export([intercept/4]). | ||
|
||
intercept(Msg, | ||
#{client_id := ClientId}, | ||
incoming_message_interceptors, | ||
#{annotation_key := AnnotationKey} | ||
) -> | ||
rabbit_message_interceptor:set_msg_annotation(Msg, | ||
AnnotationKey, | ||
ClientId, | ||
true); | ||
intercept(Msg, _MsgInterceptorCtx, _Group, _Config) -> | ||
Msg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this file is untested?
Adding the following simple test would be good enough:
- MQTT client publishes a single message.
- Different clients (MQTT, AMQP 1.0, AMQP 0.9.1) receive this message validating that it contains the MQTT client ID of the publisher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... let me know if you have difficulties adding such a test case (I can add it as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you can see from my last commit, I'm struggling to get the published message with the amqp10_client
. If you want to finish that up, I won't come back to this until tomorrow ~0830.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will add a test.
This commit enables users to provide custom message interceptor modules, i.e. modules to process incoming and outgoing messages. The `rabbit_message_interceptor` behaviour defines a `intercept/4` callback, for those modules to implement. Co-authored-by: Péter Gömöri <[email protected]>
5e98e88
to
8acdc65
Compare
efe59e8
to
c7c7cd1
Compare
It might be better to leave full support for outgoing interceptors out of this PR. It lays the groudwork for those interceptors, but triggering them is not included in this PR to not grow the scope. Wdyt? |
That's okay, we can add them at a later point. |
@@ -43,7 +43,7 @@ export BUILD_WITHOUT_QUIC | |||
|
|||
LOCAL_DEPS = ssl | |||
DEPS = ranch rabbit amqp10_common | |||
TEST_DEPS = cowlib emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream rabbitmq_federation | |||
TEST_DEPS = cowlib emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream rabbitmq_federation amqp10_client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this addition is necessary because rabbitmq_amqp_client
already depends on amqp10_client
.
Proposed Changes
This PR enables users to provide custom message interceptor modules,
i.e. modules to process incoming and outgoing messages. The
rabbit_message_interceptor
behaviour defines aintercept/4
callback,for those modules to implement.
The
intercept/4
callback receives aMsg
; a context map provided bythe protocol-specific module that invoked the interception; a
Group
specifying whether
Msg
is a incoming or a outgoing message; and aconfiguration map specific to the interceptor.
Some of those protocol-specific modules that invoke the interception and
provide the context map are
rabbit_channel
,rabbit_amqp_session
andrabbit_mqtt_processor
. Themsg_interceptor_ctx()
type makesmandatory to provide certain information on the context map, such as the
protocol used to send/get the message, the virtual host where it
arrived/departed or the username publishing/getting the message. Other
pieces of information such as the name of the connection used to
publish/get the message may be added to the context map but are not
mandatory, so users providing custom interceptors should be aware of
this when implementing such modules.
For an interceptor to be invoked it needs to be properly configured
through the
.conf
file.For just enabling an interceptor it would be enough to add a single line
to the
.conf
file. For an interceptor module namedfoo_mod.erl
thatline would be:
Notice this configuration line only enables the module as an interceptor
for incoming messages. To also make the same module intercept outgoing
messages two lines are needed.
Thid last two-line configuration will generate an Erlang configuration
like this:
The empty map after
foo_mod
is the configuration map that will bereceived by the
invoke/4
function as fourth argument. To populate thismap users can add more lines to the
.config
file. For example:would result on a config like the following.
The configuration map can be different for the same interceptor used as
incoming or outgoing interceptor. Also, the
enabled
key never getsadded to the resulting configuration map. In fact, if other keys are
used for the same group (incoming or outgoing) you don't need to specify
the
enabled
key. Therefore the configuration above could be simplifiedto:
Users providing custom interceptors should also provide the Cuttlefish
schemas providing mappings for the configuration parameters of their
interceptors. This commit already provides a Cuttlefish translation
for those parameters. Therefore, those user-provided mappings should
keep the expected format:
For compatibility reasons, an alias can be used in
<interceptor_module_name>
instead of the actual module name, but onlyfor the interceptors that already existed in RabbitMQ before this
commit. The
set_header_timestamp
andset_header_routing_node
aliasescan be used as
<interceptor_module_name>
. However, using the actualmodule names, wich are
rabbit_message_interceptor_timestamp
andrabbit_message_interceptor_routing_node
respectively is recommended.The order in which the interceptors are invoked is the same as the order
in which they appear in the
.conf
file. For instance, the followingconfig lines
would result in the following Erlang configuration
and the following order of invocation for incoming messages:
mod3 (first) -> mod1 -> mod2 (last)
.Types of Changes
Checklist
CONTRIBUTING.md
documentFurther Comments